
In the ODC and Dask (LocalCluster) notebook we saw how dask can be used to speed up IO and computation by parallelising operations into chunks and tasks, and using delayed tasks and task graph optimization to remove redundant tasks when results are not used.
Using chunks provides one additional capability beyond parallelisation - the ability to perform computations that are larger than available memory.
Since dask operations are performed on chunks it is possible for dask to perform operations on smaller pieces that each fit into memory. This is particularly useful if you have a large amount of data that is being reduced, say by performing a seasonal mean.
As with parallelisation, not all algorithms are amenable to being broken into smaller pieces so this won't always be possible. Dask arrays though go a long way to make this easier for a great many operations.
Firstly, some initial imports...
# EASI tools
import git
import sys, os
os.environ['USE_PYGEOS'] = '0'
repo = git.Repo('.', search_parent_directories=True).working_tree_dir
if repo not in sys.path: sys.path.append(repo)
from easi_tools import EasiDefaults, notebook_utils
easi = EasiDefaults()
Successfully found configuration for deployment "chile"
We'll continue using the same algorithm as before but this time we're going to modify it's memory usage to exceed the LocalCluster's available memory. This example notebook is setup to run on a compute node with 28 GiB of available memory and 8 cores for the LocalCluster. We'll make that explicit here in case you are blessed with a larger number of resources.
Let's start the cluster...
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2, threads_per_worker=4)
cluster.scale(n=2, memory="14GiB")
client = Client(cluster)
client
/env/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 34023 instead warnings.warn(
Client-44f6695b-fa48-11ed-97f2-1eb1b782f397
| Connection method: Cluster object | Cluster type: distributed.LocalCluster |
| Dashboard: http://127.0.0.1:34023/status |
00ad2e07
| Dashboard: http://127.0.0.1:34023/status | Workers: 2 |
| Total threads: 8 | Total memory: 29.00 GiB |
| Status: running | Using processes: True |
Scheduler-58ee65ae-5cb3-469c-b7ab-5b0a630d9c0d
| Comm: tcp://127.0.0.1:41199 | Workers: 2 |
| Dashboard: http://127.0.0.1:34023/status | Total threads: 8 |
| Started: Just now | Total memory: 29.00 GiB |
| Comm: tcp://127.0.0.1:36641 | Total threads: 4 |
| Dashboard: http://127.0.0.1:42899/status | Memory: 14.50 GiB |
| Nanny: tcp://127.0.0.1:42135 | |
| Local directory: /tmp/dask-worker-space/worker-617vkvrl | |
| Comm: tcp://127.0.0.1:34969 | Total threads: 4 |
| Dashboard: http://127.0.0.1:40065/status | Memory: 14.50 GiB |
| Nanny: tcp://127.0.0.1:41973 | |
| Local directory: /tmp/dask-worker-space/worker-pr7mt680 | |
We can monitor memory usage on the workers using the dask dashboard URL below and the Status tab. The workers are local so this will be memory on the same compute node that Jupyter is running in.
dashboard_address = notebook_utils.localcluster_dashboard(client=client,server=easi.hub)
print(dashboard_address)
https://hub.datacubechile.cl/user/jhodge/proxy/34023/status
As we will be using Requester Pays buckets in AWS S3, we need to run the configure_s3_access() function below with the client option to ensure that Jupyter and the cluster have the correct permissions to be able to access the data.
from datacube.utils.aws import configure_s3_access
configure_s3_access(aws_unsigned=False, requester_pays=True, client=client);
import datacube
from datacube.utils import masking
dc = datacube.Datacube()
# Get the centroid of the coordinates in the default configuration
central_lat = sum(easi.latitude)/2
central_lon = sum(easi.longitude)/2
# or set your own by uncommenting and editing the following lines
# central_lat = -42.019
# central_lon = 146.615
# Set the buffer to load around the central coordinates
# This is a radial distance for the bbox to actual area so bbox 2x buffer in both dimensions
buffer = 0.05
# Compute the bounding box for the study area
study_area_lat = (central_lat - buffer, central_lat + buffer)
study_area_lon = (central_lon - buffer, central_lon + buffer)
# Data products - Landsat 8 ARD from Geoscience Australia
products = easi.product('landsat')
# Set the date range to load data over
set_time = ("2021-01-01", "2021-12-31")
# Set the measurements/bands to load. None eill load all of them
measurements = None
# Set the coordinate reference system and output resolution
set_crs = easi.crs('landsat') # If defined, else None
set_resolution = easi.resolution('landsat') # If defined, else None
# set_crs = "epsg:3577"
# set_resolution = (-30, 30)
group_by = "solar_day"
dataset = None # clear results from any previous runs
dataset = dc.load(
product=products,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling={"fmask": "nearest", "*": "average"},
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = {"time":1},
group_by=group_by,
)
dataset
<xarray.Dataset>
Dimensions: (time: 22, y: 381, x: 335)
Coordinates:
* time (time) datetime64[ns] 2021-01-03T14:39:19.317361 ... 2021-12...
* y (y) float64 6.692e+06 6.692e+06 ... 6.681e+06 6.681e+06
* x (x) float64 8.572e+05 8.572e+05 ... 8.672e+05 8.672e+05
spatial_ref int32 32718
Data variables:
coastal (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
blue (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
green (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
red (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
nir08 (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
swir16 (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
swir22 (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
qa_pixel (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
qa_aerosol (time, y, x) uint8 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
qa_radsat (time, y, x) uint16 dask.array<chunksize=(1, 381, 335), meta=np.ndarray>
Attributes:
crs: epsg:32718
grid_mapping: spatial_refWe can check the total size of the dataset using nbytes. We'll divide by 2**30 to have the result display in gibibytes.
print(f"dataset size (GiB) {dataset.nbytes / 2**30:.2f}")
dataset size (GiB) 0.05
As you can see this Region of Interest (ROI) and spatial range (1 year) is tiny, let's scale up by increasing our ROI by increasing the buffer
buffer = 1
# Compute the bounding box for the study area
study_area_lat = (central_lat - buffer, central_lat + buffer)
study_area_lon = (central_lon - buffer, central_lon + buffer)
dataset = None # clear results from any previous runs
dataset = dc.load(
product=products,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling={"qa_pixel": "nearest", "*": "average"},
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = {"time":1},
group_by=group_by,
)
print(f"dataset size (GiB) {dataset.nbytes / 2**30:.2f}")
dataset size (GiB) 40.49
Okay, this should now be larger than the available memory that our Jupyter node has available (which you should be able to see at the bottom of your window - probably 28.00 or 29.00 GB). This creates issues for calculation. We need to have a solution that lets us calculate the information that we want without the machine running out of memory.
Dask can compute many tasks and handle large amounts of data over the course of a series of calculations. Collectively, these calculations might work on more data in total than can fit in RAM, but it is a problem if the final product is too big to fit in RAM. Below we will change the dataset so that the final result can fit in RAM and then use the .compute() function to run all the calculations.
Let's take a look at the memory usage for one of the bands, we'll use red.
dataset.red
<xarray.DataArray 'red' (time: 45, y: 7607, x: 6685)>
dask.array<dc_load_red, shape=(45, 7607, 6685), dtype=uint16, chunksize=(1, 7607, 6685), chunktype=numpy.ndarray>
Coordinates:
* time (time) datetime64[ns] 2021-01-03T14:38:55.375489 ... 2021-12...
* y (y) float64 6.8e+06 6.8e+06 6.8e+06 ... 6.572e+06 6.572e+06
* x (x) float64 7.629e+05 7.629e+05 ... 9.633e+05 9.634e+05
spatial_ref int32 32718
Attributes:
units: reflectance
nodata: 0
crs: epsg:32718
grid_mapping: spatial_refYou can see the year now has more time observations than in the first dataset because we've expanded the area of interest and picked up multiple satellite passes. The spatial dimensions are also much larger.
Take a note of the Chunk Bytes - probably around 80 MiB. This is the smallest unit of this dataset that dask will do work on. To do an NDVI calculation, dask will need two bands, the mask, the result and a few other temporary variables in memory at once. This means whilst this value is an indicator of memory required on a worker to perform an operation it is not the total, which will depend on the operation.
We can adjust the amount of memory per chunk further by chunking the spatial dimension. Let's split it into 2048x2048 size pieces.
dataset = None # clear results from any previous runs
dataset = dc.load(
product=products,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling={"fmask": "nearest", "*": "average"},
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = {"time":1, "x":2048, "y":2048}, ## Adjust the chunking spatially as well
group_by=group_by,
)
print(f"dataset size (GiB) {dataset.nbytes / 2**30:.2f}")
dataset size (GiB) 40.49
As you can see the total dataset size stays the same.
Look at the red data variable below. You can see the chunk size has reduced to 8 MiB, and there are now more chunks (around 700-800) - compared with around 60 previously. This will result in a higher number of Tasks for Dask to work on. This makes sense: smaller chunks, more tasks.
TIP: The relationship between tasks and chunks is a critical tuning parameter.
Workers have limits in memory and compute capacity. The Dask Scheduler has limits in how many tasks it can manage efficiently (and remember it is tracking all of the data variables, not just this one). The trick with Dask is to give it a good number of chunks of data which aren't too big and don't result in too many tasks. There is always a trade-off and each calculation will be different. Ideally, you want chunks to be aligned with how the data is stored, or how the data is going to be used. If those two things are different, then rechunking can result in large amounts of data needing to be held in the cluster memory, which could result in failures. In the same way, if chunks are too large, they might end up taking up too much memory, causing a crash. This is sometimes down to trial and error.
Later, when we move to a fully remote and distributed cluster, chunks also become an important element in communicating between workers over networks.
If you look carefully at the cube-like diagram in the summary below you will see that some internal lines showing the chunk boundaries for the spatial dimensions. 2048 wasn't an even multiplier so dask has made some chunks on the edges smaller. The specification of chunks is a guide: the actual data, numpy arrays in this case, are made into chunk sized shapes or smaller. These are called blocks in dask and represent the actual shape of the numpy array that will be processed.
Somewhat confusingly the terms blocks and chunks are also used in dask literature and you'll need to check the context to see if it is referring to the specification or the actual block of data. For the moment this differentiation doesn't matter but when performing low level custom operations knowing that your blocks might be a different shape does matter.
dataset.red
<xarray.DataArray 'red' (time: 45, y: 7607, x: 6685)>
dask.array<dc_load_red, shape=(45, 7607, 6685), dtype=uint16, chunksize=(1, 2048, 2048), chunktype=numpy.ndarray>
Coordinates:
* time (time) datetime64[ns] 2021-01-03T14:38:55.375489 ... 2021-12...
* y (y) float64 6.8e+06 6.8e+06 6.8e+06 ... 6.572e+06 6.572e+06
* x (x) float64 7.629e+05 7.629e+05 ... 9.633e+05 9.634e+05
spatial_ref int32 32718
Attributes:
units: reflectance
nodata: 0
crs: epsg:32718
grid_mapping: spatial_refWe won't worry to much about tuning these parameters right now and instead will focus on processing this larger dataset. As before we can exploit dask's ability to use delayed tasks and apply our masking and NDVI directly to the full dataset. We'll also add an unweighted seasonal mean calculation using groupby("time.season").mean("time"). Dask will seek to complete the reductions (by chunk) first as they reduce memory usage.
It's probably worth monitoring the dask cluster memory usage via the dashboard Workers Memory to see just how little ram is actually used during this calculation despite it being performed on a large dataset.
print(dashboard_address)
https://hub.datacubechile.cl/user/jhodge/proxy/34023/status
We will now calculate NDVI and group the results by season:
# Identify pixels that don't have cloud, cloud shadow or water
from datacube.utils import masking
good_pixel_flags = {
'nodata': False,
'cloud': 'not_high_confidence',
'cloud_shadow': 'not_high_confidence',
'water': 'land_or_cloud'
}
cloud_free_mask = masking.make_mask(dataset['qa_pixel'], **good_pixel_flags)
# Apply the mask
cloud_free = dataset.where(cloud_free_mask)
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free.nir08 - cloud_free.red
band_sum = cloud_free.nir08 + cloud_free.red
# Calculate NDVI and store it as a measurement in the original dataset ta da
ndvi = None
ndvi = band_diff / band_sum
ndvi_unweighted = ndvi.groupby("time.season").mean("time") # Calculate the seasonal mean
Let's check the shape of our result - it should have 4 seasons now instead of the individual dates.
ndvi_unweighted
<xarray.DataArray (season: 4, y: 7607, x: 6685)>
dask.array<stack, shape=(4, 7607, 6685), dtype=float64, chunksize=(1, 2048, 2048), chunktype=numpy.ndarray>
Coordinates:
* y (y) float64 6.8e+06 6.8e+06 6.8e+06 ... 6.572e+06 6.572e+06
* x (x) float64 7.629e+05 7.629e+05 ... 9.633e+05 9.634e+05
spatial_ref int32 32718
* season (season) object 'DJF' 'JJA' 'MAM' 'SON'Before we do the compute() to get our result we should make sure the final result will fit in memory for the Jupyter kernel
print(f"dataset size (GiB) {ndvi_unweighted.nbytes / 2**30:.2f}")
dataset size (GiB) 1.52
This shows that the resulting data should be around 1 GiB of data, which will fit in local memory.
If you are monitoring the cluster when you run the cell below, you might notice a delay between running the next cell and actual computation occuring. Dask performs a task graph optimisation step on the client not the cluster. How long this takes depends on the number of tasks and complexity of the graph. The speed of this step has improved recently due to recent Dask updates. We'll talk more about this later.
In the meantime, run the next cell and watch dask compute the result without running out of memory. You might notice that your cluster spills some data to disk (the grey part of the bars in the Bytes stored per worker graph). This is not normally desirable and slows down the calculation (because reading and writing to/from the disk is slower than to/from RAM), but it is a mechanism used by Dask to help manage large calculations.
Tip: don't forget to look at your Dask Dashboard (URL a few cells above) to watch what is happening in your cluster
actual_result = ndvi_unweighted.compute()
/env/lib/python3.10/site-packages/rasterio/warp.py:344: NotGeoreferencedWarning: Dataset has no geotransform, gcps, or rpcs. The identity matrix will be returned. _reproject( /env/lib/python3.10/site-packages/rasterio/warp.py:344: NotGeoreferencedWarning: Dataset has no geotransform, gcps, or rpcs. The identity matrix will be returned. _reproject(
To avoid northern/southern hemisphere differences, the season values are represented as acronyms of the months that make them up, so:
Let's plot the result for DJF. This will take a few seconds, the image is several thousand pixels across.
actual_result.sel(season='DJF').plot(robust=True, size=6, aspect='equal')
<matplotlib.collections.QuadMesh at 0x7fe725bf8c40>
Not the most useful visualisation as a small image, and a little slow. Dask can help with this too but that's a topic for another notebook. There are many other ways to work with Dask and optimize performance. This is just the beginning of how to manage large calculations.
client.close()
cluster.close()